package org.yunai.swjg.server.core.service;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.yunai.swjg.server.core.GameServerRuntime;
import org.yunai.swjg.server.core.message.GameSessionClosedMessage;
import org.yunai.swjg.server.core.message.GameSessionOpenedMessage;
import org.yunai.swjg.server.core.session.GameSession;
import org.yunai.yfserver.common.LoggerFactory;
import org.yunai.yfserver.message.sys.SysInternalMessage;
import org.yunai.yfserver.plugin.mina.service.AbstractMinaIoHandler;
import org.yunai.yfserver.server.IMessageProcessor;
import org.yunai.yfserver.util.Assert;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 游戏服务器IoHandler实现
 * User: yunai
 * Date: 13-3-27
 * Time: 上午12:20
 */
public class GameServerIoHandler
        extends AbstractMinaIoHandler<GameSession> {

    private static final Logger LOGGER_GAME = LoggerFactory.getLogger(LoggerFactory.Logger.game, GameServerIoHandler.class);
    private static final Logger LOGGER_MSG = LoggerFactory.getLogger(LoggerFactory.Logger.msg, GameServerIoHandler.class);

    private final GameExecutorService gameExecutorService;

    /**
     * 被暂停读取的session集合<br />
     * 当消息队列满的时候，会将之后有消息的session放入该集合
     */
    private volatile ConcurrentMap<IoSession, Boolean> suspendReadSessions = new  ConcurrentHashMap<IoSession, Boolean>();
    /**
     * 标记当前checkSuspendReadSessionRunner是否正在检查
     */
    private volatile AtomicBoolean checking = new AtomicBoolean(false);
    /**
     * checkSuspendReadSessionRunner的执行周期
     */
    private final long checkPeriod;
    /**
     * 检查被暂停读取的session的Runnable实现.<br />
     * 它会一定时间将session从suspendReadSessions移除
     */
    private final Runnable CHECK_SUSPEND_READ_SESSION_RUNNER = new Runnable() {
        @Override
        public void run() {
            if (suspendReadSessions.isEmpty() || msgProcessor.isFull() || checking.compareAndSet(false, true)) {
                return;
            }
            try {
                Iterator<IoSession> sessionIterator = suspendReadSessions.keySet().iterator();
                while (sessionIterator.hasNext()) {
                    IoSession session = sessionIterator.next();
                    sessionIterator.remove();

                    if (session.isConnected()) {
                        session.resumeRead();
                        LOGGER_GAME.debug("[CHECK_SUSPEND_READ_SESSION_RUNNER] [session.resumeRead({})].", session.getId());
                    }
                }
            } finally {
                checking.set(false);
            }
        }
    };

    /**
     * @param msgProcessor 消息处理器
     * @param gameExecutorService 系统线程池服务器
     * @param checkPeriod 检查频率, 单位（毫秒）.{@link GameServerIoHandler#CHECK_SUSPEND_READ_SESSION_RUNNER}
     */
    public GameServerIoHandler(IMessageProcessor msgProcessor, GameExecutorService gameExecutorService, long checkPeriod) {
        Assert.isTrue(checkPeriod >= 1000, "checkPeriod must greater than 1000.");

        this.msgProcessor = msgProcessor;
        this.gameExecutorService = gameExecutorService;
        this.checkPeriod = checkPeriod;
    }

    /**
     * 初始化方法<br />
     * <pre>
     *     1. 启动checkSuspendReadSessionRunner
     * </pre>
     */
    public void init() {
        gameExecutorService.scheduleTask(CHECK_SUSPEND_READ_SESSION_RUNNER, this.checkPeriod);
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        // 服务器状态为不开放时，直接关闭session
        if (!GameServerRuntime.isOpen()) {
            LOGGER_MSG.info("[sessionOpened] [opened session({}) will be closed].", session.getId());
            session.close(true);
            return;
        }
        // 初始化session
        LOGGER_GAME.info("[sessionOpened] [session open:{}].", session.getId());
        GameSession minaSession = createSession(session);
        session.setAttribute(SESSION_ATTR_ISESSION, minaSession);
        msgProcessor.put(new GameSessionOpenedMessage(minaSession));
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        LOGGER_GAME.info("[sessionClosed] [session close:{}].", session.getId());

        GameSession minaSession = (GameSession) session.getAttribute(SESSION_ATTR_ISESSION);
        if (minaSession != null) {
            session.removeAttribute(minaSession);
        }

        msgProcessor.put(new GameSessionClosedMessage(minaSession));
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        // 当服务器状态为不开放时，除了内部消息(SysInternalMessage)，其他类型消息都不处理
        if (!GameServerRuntime.isOpen() && !(message instanceof SysInternalMessage)) {
            LOGGER_MSG.info("[messageReceived] [msg({}) not received when game server is not open].", message.getClass());
            return;
        }
        // 调用父方法将消息加入消息队列
        super.messageReceived(session, message);
        // 当服务器险制读&&主消息队列已满，则将该session暂停读取消息
        if (GameServerRuntime.isReadTrafficControl() && msgProcessor.isFull()
                && suspendReadSessions.putIfAbsent(session, Boolean.TRUE)) {
            session.suspendRead();
            LOGGER_GAME.debug("[messageReceived] [session.suspendRead({})].", session.getId());
        }
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        // TODO 有需要在实现
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
    }

    @Override
    protected GameSession createSession(IoSession session) {
        return new GameSession(session);
    }
}
